package com.dc.schedule.server.service.client.impl;

import com.dc.schedule.api.model.SocketMessage;
import com.dc.schedule.server.model.ClientConnectionEvent;
import com.dc.schedule.server.model.ClientDisconnectEvent;
import com.dc.schedule.server.model.ClientExceptionEvent;
import com.dc.schedule.server.model.ClientMessageEvent;
import com.dc.schedule.server.service.client.ClientConnectionService;
import com.dc.schedule.server.service.client.ClientMessageConvertor;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;

/**
 * <p>Descriptions...
 *
 * @author Diamon.Cheng
 * @date 2022/7/23.
 */
@Service
@RequiredArgsConstructor
@Slf4j
public class NettyClientConnectionServiceImpl extends ChannelInboundHandlerAdapter
        implements ClientConnectionService {
    private final ClientMessageConvertor clientMessageConvertor;
    
    private final Map<String, ChannelHandlerContext> clientsMap = new ConcurrentHashMap<>(3);
    
    private final BlockingQueue<ClientConnectionEvent> unprocessedMessages = new LinkedBlockingDeque<>();
    
    private String clientIdFromContext(ChannelHandlerContext ctx) {
        return ctx.channel().id().asLongText();
    }
    
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) {
        final String clientId = clientIdFromContext(ctx);
        clientsMap.put(clientId, ctx);
        log.debug("[Netty]-client建立连接:{}", clientId);
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        final String clientId = clientIdFromContext(ctx);
        final ByteBuf buf = (ByteBuf) msg;
        final byte[] bytes = new byte[buf.readableBytes()];
        buf.readBytes(bytes);
        final String json = new String(bytes, StandardCharsets.UTF_8);
        log.debug("[Netty]-client消息:{}->{}", clientId, json);
        try {
            final SocketMessage socketMessage = clientMessageConvertor.decode(json);
            ClientMessageEvent clientMessageEvent = new ClientMessageEvent();
            clientMessageEvent.setInnerClientId(clientId);
            clientMessageEvent.setSocketMessage(socketMessage);
            unprocessedMessages.add(clientMessageEvent);
        } catch (Exception e) {
            log.error("消息解码失败", e);
            final ClientExceptionEvent clientExceptionEvent = new ClientExceptionEvent();
            clientExceptionEvent.setInnerClientId(clientId);
            clientExceptionEvent.setCause(e);
            unprocessedMessages.add(clientExceptionEvent);
        } finally {
            buf.release();
        }
    }
    
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) {
        final String clientId = clientIdFromContext(ctx);
        clientsMap.remove(clientId);
        final ClientDisconnectEvent clientDisconnectEvent = new ClientDisconnectEvent();
        clientDisconnectEvent.setInnerClientId(clientId);
        unprocessedMessages.add(clientDisconnectEvent);
        log.debug("[Netty]-client断开连接:{}", clientId);
    }
    
    @Override
    public void sendMessage(String innerClientId, SocketMessage message) {
        final ChannelHandlerContext ctx = clientsMap.get(innerClientId);
        if (ctx == null) {
            throw new IllegalStateException("客户端不存在!");
        }
        final String json = clientMessageConvertor.encode(message);
        // netty 这里貌似不需要加锁
        ctx.writeAndFlush(Unpooled.unreleasableBuffer(Unpooled.copiedBuffer(json, StandardCharsets.UTF_8)));
    }
    
    @Override
    public void disconnect(String innerClientId) {
        final ChannelHandlerContext ctx = clientsMap.get(innerClientId);
        ctx.close();
    }
    
    @Override
    public ClientConnectionEvent takeEvent() {
        try {
            return unprocessedMessages.poll(200, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            log.warn("读取消息时程序中断", e);
            return null;
        }
    }
    
    @Override
    public boolean isSharable() {
        return true;
    }
}
